Vertex AI Pipeline で既に存在する Artifact を返す
過去に作成した Artifact をまた使いたい
再利用できると同じ ML Metadata から Pipeline やその他もろもろを辿れる
今のところあまり良くはないがこうする
Importer に任せる
metadata 変えたくなったら別の Artifact になってしまうのを諦める
やりたいこと
データ前処理パイプラインコンポーネントに、欲しいデータを要求する
env: str, target_id: str 的な、属性値で要求する、環境 hoge の id fuga のやつくれ
resolve コンポーネント
env, target_id から gs://... を解決する(定まるようにしておく)
ファイルがなければ初期化フローを実行して配置する
metadata に設定される値を一通り返す
impoter コンポーネント
dsl.importer(..., reimport=False) で resolve_task.output を参照する
実装例
code:import_pipeline.py
from typing import NamedTuple
from kfp import dsl
@dsl.component(
base_image="python:3.11",
)
def load_data_if_needed(env: str, id: str, file: str) -> NamedTuple( # type: ignore
"Outputs",
uri=str,
origin_uri=str,
content_type=str,
):
# ここでの前処理は別の GCS バケットからデータをコピーするだけとする
target_uri = f"gs://my-current-project-bucket/imported-data/{env}/{id}/{file}"
origin_uri = f"gs://other-origin-project-bucket/...."
client = storage.Client()
t_bucket, t_name = target_uri.split("/", 3)2: o_bucket, o_name = origin_uri.split("/", 3)2: # target に無ければコピー
if not storage.Blob.from_string(target_uri, client).exists():
client.bucket(o_bucket).copy_blob(
client.bucket(o_bucket).blob(o_name),
client.bucket(t_bucket),
t_name,
)
blob_meta = client.bucket(t_bucket).get_blob(t_name)
from collections import namedtuple
return Outputs(
target_uri,
origin_uri,
blob_meta.content_type,
)
@dsl.pipeline(name="import-data")
def pipeline(env: str, id: str, file: str) -> dsl.Dataset:
load_task = load_data_if_needed(env=env, id=id, file=file)
importer = dsl.importer(
artifact_class=dsl.Dataset,
artifact_uri=load_task.outputs"uri", metadata={
"origin_name": "other-origin-project",
},
reimport=False,
)
return importer.output
---.icon
以下は Import 周りの試行錯誤のログ
Impoter を使ってがんばる
自分で Artifact インスタンス作って返す
の2通りを検討
Importer を使う例
再利用というより既に外部にあるリソースを Artifact として参照する例
Define a pipeline that uses your components and the Importer のあたり
code:importer.py
# pipeline で import する
importer = kfp.dsl.importer(
artifact_uri="gs://ml-pipeline-playground/shakespeare1.txt",
artifact_class=dsl.Dataset,
reimport=False,
)
# importer.output で参照する
comp1_task = comp1(imported=importer.output)
実行時にファイルが都度 gcs へコピーされたりはしない、pipeline の入出力の値部分だけ gcs に書かれる
reimport=False があれば既に ML Metadata に存在する場合作成されない
既に存在するかどうかのチェックは uri だけでなく schema_title や metadata 含めたものになる...と思う
別の component の返り値を artifact_uri に指定できる
metadata も指定できるが、DSL の都合上まとめてはできない...
code:importer_with_metadata.py
pretask = ...
# これは問題ない
importer = dsl.importer(
...,
metadata={"key1": pretask.outputs'key1', "key2": pretask.outputs'key2'}, )
# こういうのはだめ、outputs の各値は PipelineArtifactChannel だから
importer = dsl.importer(..., metadata=pretask.outputs'metadata') importer = dsl.importer(..., metadata={**pretask.outputs'metadata'}) 自分で ML Metadata 引いて Artifact を返す
Importer 使わないパターン
これはゴミができるか、都度新しい Artifact になってしまう
code:importer_artifact.py
...
import google.cloud.aiplatform as aip
try:
artifact = aip.Artifact.get_with_uri(target_gcs_uri)
except ValueError:
artifact = aip.Artifact.create(
schema_title="system.Artifact",
display_name="check_synced",
uri=target_gcs_uri,
metadata={"source_uri": origin_gcs_uri},
)
return dsl.Artifact(
name=artifact.resource_name,
uri=target_gcs_uri,
)
これは同じ Artifact が返るが、都度ゴミの Artifact が1個作られる
上は pipeline が実行段階で作ってた? もの、実行前にパイプライン作成段階で見えるやつ
下は自分で作ったもの
https://gyazo.com/44cc324dbe2f970f674a7bcb02bfead0
component が Artifact を返すと DAG から分かった段階で空の要素が作られてる
自分で Artifact を作るのではなく、component 引数の Output[Artifact] に値をセットする方式
ゴミは出ないが毎回新しい Artifact になってしまう、まあ DAG の段階で分かって id 払い出してるんだな
code:return_existing.py
def check_synced(
target_gcs_uri: str, origin_gcs_uri: str, file: dsl.Outputdsl.Dataset ):
...
import google.cloud.aiplatform as aip
try:
artifact = aip.Artifact.get_with_uri(target_gcs_uri)
except ValueError:
artifact = aip.Artifact.create(
schema_title="system.Artifact",
display_name="check_synced",
uri=target_gcs_uri,
metadata={"source_uri": origin_gcs_uri},
)
file.uri = artifact.uri
file.metadata = artifact.metadata
無ければ Artifact を作成 & 登録してそれを使いたい
あれば importer の返す値を使う、的なことをしたいのだが
うーん
名前が importer.artifact になったり metadata 作りづらいけど importer のほうがいいかなあ
PipelineArtifactChannel も対応してくれや
いやでも ParameterChannel を受ける artifact_uri に outputs 渡せて、metadata がだめなのへんな気がする
そしてこれは通るから、pipeline 側のコンパイル時に何か足りないのでは
code:koreha_ok.py
importer = dsl.importer(
artifact_class=dsl.Dataset,
artifact_uri=check_synced_task.outputs"uri", metadata=check_synced_task.outputs, # これはいけるんだよね、json 文字列になるけど
reimport=False,
)
怒られてるのはここ
metadata_protobuf_struct.update(task.importer_spec.metadata)
protobuf にできない? AttributeError: 'str' object has no attribute 'items'
動かない時 task.importer_spec.metadata は "{{$.inputs.parameters['metadata']}}"
動く時はこういうやつ {'foo': "{{$.inputs.parameters['metadata']}}"} で dict
ここは本来 dict が入ってるべき
task.importer_spec.artifact_uri はこういうやつ
{{channel:task=check-synced;name=uri;type=String;}}
これは PipelineParameterChannel の文字列表現か、runtime_parameter = uri している
metadata も runtime にできない?
うーん outputs['metadata'] のより詳細な型が分かって扱えないとだめか?
task.importer_spec.metadata が dict のときだけ protobuf にすればいいのでは
素通ししてみるとコンパイルは通るし実行できるけど metadata の設定はされないね
https://gyazo.com/18daedcbe1874a8a666fc7afbe648320
DAG 上では謎なことが起きている、Importer の出力と、下の transcribe に接続する2つの出力は同じ Artifact なのだが...